582bdba4b201e6ab8e2a9a05cff3566f1bab9dce,src/java/org/apache/cassandra/streaming/StreamReader.java,StreamReader,read,#ReadableByteChannel#,85
Before Change
long totalSize = totalSize();
Pair<String, String> kscf = Schema.instance.getCF(cfId);
if (kscf == null)
{
// schema was dropped during streaming
throw new IOException("CF " + cfId + " was dropped during streaming");
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
After Change
long totalSize = totalSize();
Pair<String, String> kscf = Schema.instance.getCF(cfId);
ColumnFamilyStore cfs = null;
if (kscf != null)
cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
if (kscf == null || cfs == null)
{
// schema was dropped during streaming
throw new IOException("CF " + cfId + " was dropped during streaming");
}
logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
cfs.getColumnFamilyName());
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
SSTableWriter writer = null;
DecoratedKey key = null;
try
{
writer = createWriter(cfs, totalSize, repairedAt, format);
while (in.getBytesRead() < totalSize)
{
key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
writeRow(key, writer, in, cfs);
// TODO move this to BytesReadTracker
session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
}
logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
return writer;
} catch (Throwable e)
{
if (key != null)
logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
if (writer != null)
{
try